home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Language/OS - Multiplatform Resource Library
/
LANGUAGE OS.iso
/
presto
/
presto10.lha
/
src
/
scheduler.C
< prev
next >
Wrap
C/C++ Source or Header
|
1991-12-11
|
11KB
|
503 lines
/*
* scheduler.c
*
* Implementation of shared scheduler class. All process
* objects interact with the scheduler object trying to
* getreadythreads off the ready queue.
*
* ThreadPool trivially defines the base class for the shared
* ready thread pool.
*
* ThreadPoolQueue defines the default scheduling discipline.
* Very simple, no priorities.... nothing fancy.
*
* Last modified: 06/19/91
* by: pbd
* reason: Reversed order in which affinity is
* set. Previous method was to set affinity,
* and then fork children. New method forks
* children, and then sets affinity.
* Hence, new code in Scheduler::invoke that
* used to be in Process::Process (the
* ctor for the root process.
*
* Last modified: 11/07/89
* by: jef
* reason: Fix problem when numprocessors > NUMPROCS.
* This caused PRESTO to write right off the
* end of sc_p_numprocs array, trashing
* sc_p_numschedulers field, causing huge
* numbers of processes to be created, using
* up all job slots on system.
*
* Last modified: 1/1/88
* by: bnb
* reason: take care for preschedthreads
*
* Last modified: 12/21/87
* by: bnb
* reason: add these comments
*
*/
#define _SCHEDULER_C
#include <stddef.h>
#include <sys/types.h>
#include <signal.h>
#include <osfcn.h>
#include <stream.h>
#include "presto.h"
extern Main *MAIN;
//
// preschedthreads queue is where threads that get created
// before the scheduler exists get placed. On scheduler
// creation, these guys are moved to the real thread pool.
// This allows threads to be created in static constructors without
// requiring that the entire system be in place.
//
shared_t ThreadQ *preschedthreads = 0;
//
// Definition of default ThreadPool{Queue} type known to scheduler.
//
ThreadPool::ThreadPool()
{
}
ThreadPool::~ThreadPool()
{
}
Thread*
ThreadPool::get()
{
error("Base threadpool get");
// NOTREACHED
return 0;
}
void
ThreadPool::insert(Thread* t)
{
error("Base threadpool get");
// NOTREACHED
t = t; // Satisfy cfront
}
int
ThreadPool::size()
{
error("Base threadpool size");
// NOTREACHED
return 1;
}
ThreadPoolQueue::ThreadPoolQueue()
{
tp_tq = new ThreadQ(TS_READY);
}
ThreadPoolQueue::~ThreadPoolQueue()
{
delete tp_tq;
}
Thread*
ThreadPoolQueue::get()
{
register ThreadQ* tq = tp_tq; // speed hack
return tq->get();
}
void
ThreadPoolQueue::insert(Thread* t)
{
register ThreadQ* tq = tp_tq;
tq->append(t);
}
int
ThreadPoolQueue::size()
{
return tp_tq->length();
}
//
// The main scheduler system
//
Scheduler::Scheduler(int numschedulers, int quantum)
{
#ifdef PREEMPT
void sigpreempt_init();
#endif /* PREEMPT */
//
// Make sure user-specified numprocessors isn't more than PRESTO
// can support.
//
if (numschedulers > NUMPROCS) {
sc_p_numschedulers = NUMPROCS;
cerr << "'numprocessors' of " << numschedulers << " too big - ";
cerr << "using " << NUMPROCS << "\n";
}
else
sc_p_numschedulers = numschedulers;
#ifdef DEBUG_STARTUP
cout << "making scheduler, sc_p_numschedulers = "
<<sc_p_numschedulers<<"\n";
#endif /* DEBUG_STARTUP */
//
// nullify the threads
//
for (int jj=0; jj<sc_p_numschedulers; jj++)
sc_t_ready [jj] = new ThreadPoolQueue;
//
// Only create a process context if one doesn't already exist.
// User can create his own in Main::init(). Allows us to
// virtualize the constructor.
//
if (thisproc == 0)
sc_p_procs[0] = new Process(P_ROOT, 0);
else
sc_p_procs[0] = thisproc;
sc_p_activeschedulers = 1;
sc_p_busybits = 0x0;
sc_lock = new Spinlock;
sc_quantum = quantum;
#ifdef PREEMPT
if (sc_quantum)
sigpreempt_init();
#endif /* PREEMPT */
#ifdef DEBUG_STARTUP
cout << "done making scheduler\n";
#endif /* DEBUG_STARTUP */
}
//
// Scheduler invocation function.
// Create sc_p_numschedulers threads and bind each one of them
// to a process invocation. Schedule those threads to start running
// which will in turn run a new scheduler on some new processor.
//
// In case we have any preschedthreads, schedule them here.
//
int
Scheduler::invoke()
{
int cpusonline = cpus_online();
int pid;
#ifdef DEBUG_STARTUP
cout << "scheduler_starter thread - in sched->invoke ()\n";
#endif /* DEBUG_STARTUP */
#ifdef PREEMPT
extern void sigpreempt_beginclock(struct timeval *q);
extern int preemption_enabled;
#endif /* PREEMPT */
if (sc_p_numschedulers < 0)
return sc_p_activeschedulers;
if (sc_p_numschedulers > cpusonline)
sc_p_numschedulers = cpusonline;
//
// Initialize the process pool
//
initsighandlers(0);
for (pid = sc_p_activeschedulers;
pid < sc_p_numschedulers; pid++)
{
#define XNAME "proc_X"
char *pname = new char[sizeof(XNAME)];
strcpy(pname, XNAME);
pname[5] = pid + 'a' - 1;
sc_p_procs[pid] = thisproc->newprocess(pname,pid+thisproc->id());
if (sc_p_procs[pid]->state() & S_ERROR) {
perror("Scheduler::new Process");
sc_p_numschedulers = --pid;
this->abort(SIGKILL);
kill(getpid(), SIGILL);
// not reached
}
sc_p_busybits |= 1<<pid; // assume busy
}
#ifdef sequent
#ifdef i386
//
// Set processor affinity if applicable.
// This takes effect for the scheduler processes only,
// all other thread processes having had their affinity
// set (if applicable) in Process::p_runchild()
//
if (MAIN->get_affinity ())
{
if (tmp_affinity (thisproc->id()) == -1)
cout << "unable to set affinity on root process "
<< thisproc->id() << "\n";
// else
// cout << "affinity set on root process "
// << thisproc->pid()
// << thisproc->id() << "\n";
// cout.flush ();
}
#endif /* i386 */
#endif /* sequent */
initsighandlers(1); // prepare parent to clean up
#ifdef PREEMPT
if (sc_quantum) {
struct timeval q;
q.tv_sec = sc_quantum / 1000; // in msecs
q.tv_usec = (sc_quantum % 1000) * 1000; // 1msec = 1000usec
sigpreempt_beginclock(&q);
}
#endif /* PREEMPT */
//
// take care of threads created in the prescheduler era
//
if (preschedthreads) {
Thread *t;
while (t=preschedthreads->get())
resume(t);
delete preschedthreads;
preschedthreads = 0;
}
#ifdef DEBUG_STARTUP
cout << "done with invoke() \n";
#endif /* DEBUG_STARTUP */
return (sc_p_activeschedulers = sc_p_numschedulers);
}
Scheduler::~Scheduler()
{
for (int j=0; j<sc_p_numschedulers; j++)
delete sc_t_ready [j];
}
void
Scheduler::halt()
{
int pid;
#ifdef PREEMPT
extern void sigpreempt_stopclock();
if (sc_quantum)
sigpreempt_stopclock();
#endif /* PREEMPT */
#ifdef PROFILE
QFinish();
#endif
/*
* Stop listening for dead children -- otherwise can race with "master"
* and get spurious "exit" messages from schedulerReapChild().
*/
initsighandlers(-1); // turn off SIGCHLD
/*
* Ask the kids to quietly die.
*/
for (pid = 1; pid < sc_p_numschedulers; pid++)
sc_p_procs[pid]->request( R_RETURN /*R_DIE*/);
sc_p_procs[0]->request(R_RETURN); // cause main to return
}
//
// Resume a thread within a process.
// If this is a virgin thread, then the thread code starts off a little
// but differently. See threads.c
//
void
Scheduler::resume(Thread *t)
{
if (t->flags()&TF_SCHEDULER)
t->error("Can't resume a scheduler thread\n");
t->isready();
sc_t_ready [thisproc->id()]->insert(t); // some process should grab me
}
//
// Have to serialize access here so we can know when the system stops.
// Should really be more sophisticated as to determining when everything
// is done.
//
Thread*
Scheduler::getreadythread()
{
Thread *t;
int id, my_id=thisproc->id();
//
// Look in our own readyq first.
// If no ready threads in our readyq, try everyone elses.
//
t = sc_t_ready [my_id]->get();
//if (t) cout << my_id << " - ready thread in my own queue\n";
id = my_id + 1;
while (!t)
{
if (id == sc_p_numschedulers) id = 0; // wrap if necessary
if (id == my_id) break; // back where we started
t = sc_t_ready [id]->get(); // try this process
//if (t) cout << my_id << " - ready thread in " << id << "'s queue\n";
id++;
}
if (t) {
#ifdef PROFILE
QThreadRun(t->getqThread());
#endif
sc_lock->lock();
(void)busybits(1);
sc_lock->unlock();
} else {
sc_lock->lock();
int busy = busybits(0);
sc_lock->unlock();
//cout << my_id << " - no ready threads in any queue!!\n";
//
// If no busy procs, and we are the master....
//
if (busy == 0 && thisproc->isroot() )
this->halt();
}
return t;
}
void
Scheduler::error(char *s)
{
cerr << "Scheduler error " << s << "\n" << " in " << hex(long(this)) << "\n";
fatalerror();
}
ThreadPool*
Scheduler::getreadypool()
{
return sc_t_ready [thisproc->id()];
}
//
// swap new ready pool. Order of switch is important here.
// First, replace old pool with new pool.
// Then, move any threads from old pool to new pool
// Them, delete old pool.
//
ThreadPool*
Scheduler::setreadypool(ThreadPool* newtp)
{
ThreadPool* oldpool ;
Thread* t;
if (!newtp)
error("Bad arg to newreadypool");
oldpool = sc_t_ready[thisproc->id()]; // can still be active
sc_t_ready[thisproc->id()] = newtp;
// we expect to have only reference to
// oldpool by this point.
if (oldpool) {
while (t=oldpool->get())
sc_t_ready[thisproc->id()]->insert(t);
}
return oldpool;
}
void
Scheduler::print(ostream& s)
{
int i;
s << form("(Scheduler):0x%x:\n", this);
for (i = 0; i < sc_p_activeschedulers; i++)
s << form ("sc_t_ready[%d]=0x%x, ", i, sc_t_ready[i]);
s << form("sc_p_activescheduler=%d, sc_p_busybits=%d, sc_quantum=%d",
sc_p_activeschedulers, sc_p_busybits, sc_quantum);
s << sc_lock << "\n";
for (i = 0; i < sc_p_activeschedulers; i++)
s << sc_p_procs[i] << "\n";
}
//
// global version of abort. If we are the scheduler, then knock
// everyone else off first, otherwise just knock ourselves off and
// reply on the scheduler to pick us up.
//
#ifdef __DECCXX
void
#else
int
#endif
abort( void )
{
if (sched) {
sched->abort(SIGILL);
// NOT REACHED
} else
kill(getpid(), SIGILL); // core dump
#ifdef __DECCXX
return; // not reached
#else
return 0; // not reached
#endif /* __DECCXX */
}